// Copyright (c) 2019 Andy Pan
// Copyright (c) 2018 Joshua J Baker
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.

package gnet

import (
	"net"
	"time"
    "sync/atomic"
    "gitee.com/wkkcool/gnet/internal"
    "gitee.com/wkkcool/gnet/internal/socket"
    "gitee.com/wkkcool/gnet/internal/netpoll"
    "runtime"
    "sync"
    "os"
    // "unsafe"
    "golang.org/x/sys/unix"

    goconcurrentqueue "gitee.com/wkkcool/gnet/fifo-queue"
)
type FIFO = goconcurrentqueue.FIFO

var (
    acceptLock sync.Mutex
    myEpoll *EPoller
)
type Logger interface {
	// Debugf logs messages at DEBUG level.
	Debugf(format string, args ...interface{})
	// Infof logs messages at INFO level.
	Infof(format string, args ...interface{})
	// Warnf logs messages at WARN level.
	Warnf(format string, args ...interface{})
	// Errorf logs messages at ERROR level.
	Errorf(format string, args ...interface{})
	// Fatalf logs messages at FATAL level.
	Fatalf(format string, args ...interface{})
}
type mylog struct{}
func (p mylog)Debugf(format string, args ...interface{}){}
func (p mylog)Infof(format string, args ...interface{}){}
func (p mylog)Warnf(format string, args ...interface{}){}
func (p mylog)Errorf(format string, args ...interface{}){}
func (p mylog)Fatalf(format string, args ...interface{}){}

type mygnet interface{
    IsLogin() bool
    Login()
    LoginOn(addr string) error
    Readfd(buff []byte) (i int,err error)
    Writefd(buff []byte) (i int,err error)
    WriteNN([]byte) (size int,err error )
    ReadNN(n int) (size int, buf []byte)
    ShiftNN(n int) (size int)
    BufferLengthN() int
    ResetBufferN()
    GetList() interface{}
    SetList(v interface{})
    Trigger(job func(bool,func([]byte)error)error) error
    GetFIFO()(*FIFO)
    GetLinkConn()Conn
    SetLinkConn(Conn)
    CloseLinkConn()
    GetRemoteAddr()string
    SetRemoteAddr(str string)
    SetLocalAddr(str string)
    GetLocalAddr()string
    GetLoopFd()int
    // GetSqlTag()(sqltag interface{})
}

type EPoller struct{
    serv *Server
}

func (e *EPoller) AcceptNewConnection(remote string)(cout Conn,err error) {
    svr := e.serv.svr
    options := svr.opts
    var sockopts []socket.Option
	if options.ReusePort {
		sockopt := socket.Option{SetSockopt: socket.SetReuseport, Opt: 1}
		sockopts = append(sockopts, sockopt)
	}
	// if network == "tcp" && options.TCPNoDelay == TCPNoDelay {
	if options.TCPNoDelay == TCPNoDelay {
		sockopt := socket.Option{SetSockopt: socket.SetNoDelay, Opt: 1}
		sockopts = append(sockopts, sockopt)
	}
	// if network == "tcp" && options.TCPKeepAlive > 0 {
		// sockopt := socket.Option{SetSockopt: socket.SetKeepAlive, Opt: int(options.TCPKeepAlive / time.Second)}
		// sockopts = append(sockopts, sockopt)
	// }
	if options.SocketRecvBuffer > 0 {
		sockopt := socket.Option{SetSockopt: socket.SetRecvBuffer, Opt: options.SocketRecvBuffer}
		sockopts = append(sockopts, sockopt)
	}
	if options.SocketSendBuffer > 0 {
		sockopt := socket.Option{SetSockopt: socket.SetSendBuffer, Opt: options.SocketSendBuffer}
		sockopts = append(sockopts, sockopt)
	}
    // fmt.Printf("AcceptNewConnection step1------\n")
    nfd,sa,err1 := SysConnect("tcp",remote,sockopts...)
    if err1 != nil{
        err = err1
        return
    }
    socket.SetKeepAlive(nfd,30) //30秒无数据后探测，探测间隔30秒
    // fmt.Printf("AcceptNewConnection step2------\n")
    sa,_ = unix.Getsockname(nfd)
    netAddr := socket.SockaddrToTCPOrUnixAddr(sa)

    acceptLock.Lock()
    el := svr.lb.next(netAddr)
    acceptLock.Unlock()
    c := newTCPConn(nfd, el, sa, netAddr)
    // c.SetLocalAddr(c.RemoteAddr().String())
    //把nfd加入el对应的epoll里面，设置为可读
	_ = el.poller.Trigger(func() (err error) {
		if err = el.poller.AddRead(nfd); err != nil {
            el.loopCloseConn(c,os.NewSyscallError("AddRead", err))
			return
		}
		el.connections[nfd] = c
		err = el.loopOpen(c)
        if err != nil{
            el.loopCloseConn(c,os.NewSyscallError("loopOpen", err))
        }
		return
	})
    for !c.opened{
        runtime.Gosched()
    }
	return c,nil
}

//activateEventLoops
func (e *EPoller) activateEventLoops1(svr *server,numEventLoop int) (err error) {
	// Create loops locally and bind the listeners.
    //创建多个epoll加入lb里面
	for i := 0; i < numEventLoop; i++ {
       /*  l := svr.ln
		if i > 0 && svr.opts.ReusePort {
			if l, err = initListener(svr.ln.network, svr.ln.addr, svr.ln.reusePort); err != nil {
				return
			}
		} */
        var p *netpoll.Poller
		if p, err = netpoll.OpenPoller(); err == nil {
			el := new(eventloop)
			// el.ln = l
			el.svr = svr
			el.poller = p
			el.packet = make([]byte, 0x10000)
			el.connections = make(map[int]*conn)
			el.eventHandler = svr.eventHandler
			// el.calibrateCallback = svr.lb.calibrate
			// _ = el.poller.AddRead(el.ln.fd)
			svr.lb.register(el)

			// Start the ticker.
			if el.idx == 0 && svr.opts.Ticker {
				go el.loopTicker()
			}
		} else {
			return
		}

		// if p, err := netpoll.OpenPoller(); err == nil {
			// el := &eventloop{
				// // ln:                svr.ln,
				// svr:               svr,
				// poller:            p,
				// packet:            make([]byte, 0x10000),
				// connections:       make(map[int]*conn),
				// eventHandler:      svr.eventHandler,
				// calibrateCallback: svr.subEventLoopSet.calibrate,
			// }
			// svr.subEventLoopSet.register(el)
		// } else {
			// return err
		// }
	}
    // Start event-loops in background.
	// svr.startSubReactors()
    //创建多个协程
    svr.startSubReactors()
	return
}

//serve
func NewEpollServe(eventHandler EventHandler,opts ...Option) (e *EPoller,err error) {
	defer func() {
		recover()
	}()
    options := loadOptions(opts...)
    if rbc := options.ReadBufferCap; rbc <= 0 {
		options.ReadBufferCap = 0x4000
	} else {
		options.ReadBufferCap = internal.CeilToPowerOfTwo(rbc)
	}
    numEventLoop := 1
	if options.Multicore {
		numEventLoop = runtime.NumCPU()
	}
	if options.NumEventLoop > 0 {
		numEventLoop = options.NumEventLoop
	}

	svr := new(server)
	svr.opts = options
	svr.eventHandler = eventHandler
	// svr.ln = listener
    switch options.LB {
	case RoundRobin:
		svr.lb = new(roundRobinLoadBalancer)
	case LeastConnections:
		svr.lb = new(leastConnectionsLoadBalancer)
	case SourceAddrHash:
		svr.lb = new(sourceAddrHashLoadBalancer)
	}

	svr.cond = sync.NewCond(&sync.Mutex{})
	svr.ticktock = make(chan time.Duration, 1)
	svr.logger = mylog{}
    svr.codec = func() ICodec {
		if options.Codec == nil {
			return new(BuiltInFrameCodec)
		}
		return options.Codec
	}()
    server := Server{
		svr:          svr,
		Multicore:    options.Multicore,
		// Addr:         listener.lnaddr,
		NumEventLoop: numEventLoop,
		// ReusePort:    options.ReusePort,
		TCPKeepAlive: options.TCPKeepAlive,
	}
    e = new(EPoller)
    e.serv = &server
    switch svr.eventHandler.OnInitComplete(server) {
	case None:
	case Shutdown:
		return nil,nil
	}
	// defer svr.eventHandler.OnShutdown(server)
	if err := e.activateEventLoops1(svr,numEventLoop); err != nil {
		svr.closeEventLoops()
		// svr.logger.Errorf("gnet server is stopping with error: %v", err)
		return nil,err
	}
	// defer svr.stop() //因为有主协程，所以此处不需要阻塞
	return
}

func SysConnect(proto, addr string,sockopts ...socket.Option) (fd int, sockaddr unix.Sockaddr, err error) {
	return socket.SysConnect(proto, addr,sockopts...)
}

func (c *conn) IsLogin() bool{
	return c.isLogin
}
func (c *conn) Login(){
	c.isLogin = true
}

func (c *conn) LoginOn(addr string) error{
	if c.isLogin {
        return nil
    }
    dbconn := c.GetLinkConn()
    if dbconn == nil{
        conn,err := AcceptNewConnection(addr)
        if err != nil{
            c.SetContext("connect exit")
            return err
        }
        dbconn = conn
        c.SetLinkConn(conn)
        conn.SetLinkConn(c)
        c.SetRemoteAddr(dbconn.GetLocalAddr())
    }
    c.Login()
    return nil
}

func (c *conn) GetList() interface{}{
	return c.myList
}
func (c *conn) SetList(v interface{}){
	c.myList = v
}

func (c *conn) Readfd(buff []byte) (i int,err error){
    i = 0
    n := 0
    for {
        n, err = unix.Read(c.fd,buff[i:])
        if n == 0 || err != nil {
            if err == unix.EAGAIN {
                continue
            }
            return 0,err
        }
        i += n
        break
    }
    return
}

func (c *conn) ReadNN(n int) (size int, buf []byte) {
	if c.newinboundBuffer.IsEmpty() {
		return
	}
	inBufferLen := c.newinboundBuffer.Length()
	if inBufferLen < n || n <= 0 {
		n = inBufferLen
	}
	size = n
	head, tail := c.newinboundBuffer.LazyRead(n)
    buf = make([]byte,n)
    copy(buf,head)
    copy(buf[len(head):],tail)
	return
}
func (c *conn) ShiftNN(n int) (size int) {
	// if c.newinboundBuffer.IsEmpty() {
		// return
	// }
	// inBufferLen := c.newinboundBuffer.Length()
	// if inBufferLen < n || n <= 0 {
		// c.ResetBuffer()
		// size = inBufferLen
		// return
	// }
	size = n
	// if inBufferLen >= n {
		c.newinboundBuffer.Shift(n)
		return
	// }
	// c.newinboundBuffer.Reset()
	// return
}
func (c *conn) WriteNN(buf []byte) (size int,err error ) {
    return c.newinboundBuffer.Write(buf)
}
func (c *conn) BufferLengthN() int {
	return c.newinboundBuffer.Length()
}
func (c *conn) ResetBufferN() {
	c.newinboundBuffer.Reset()
}

func (c *conn) Writefd(buff []byte) (i int,err error){
   return unix.Write(c.fd,buff)
}
func (c *conn) Trigger(job func(bool,func([]byte)error)error) error {
    return c.loop.poller.Trigger(func() (err error) {
		// if c.opened {
			// err = c.write(buf)
		// }
        return job(c.opened,c.write)
	})
}

type LinkConn struct{
    fifo *FIFO
    lnkConn Conn
    locker *sync.Mutex
}

func (c *conn) GetFIFO()(*FIFO) {
    // if !c.opened || c.linkConn==nil{
        // return nil
    // }
    return c.linkConn.fifo
}

func (c *conn) GetLinkConn()(ret Conn) {
    // if !c.opened || c.linkConn==nil{
    if c.linkConn==nil{
        return nil
    }
    link := c.linkConn
    link.locker.Lock()
    defer link.locker.Unlock()
    ret = link.lnkConn
    return
}

func (c *conn) SetLinkConn(con Conn) {
    // if !c.opened || c.linkConn==nil{
    if c.linkConn==nil{
        return
    }
    link := c.linkConn
    link.locker.Lock()
    defer link.locker.Unlock()
    link.lnkConn = con
    return
}

func (c *conn) CloseLinkConn(){
    link := c.linkConn
    if link==nil{
        return
    }
    link.locker.Lock()
    con := c.linkConn.lnkConn
    if con == nil{
        link.locker.Unlock()
        return
    }
    // c.linkConn.lnkConn = nil
    link.locker.Unlock()
    // con.SetLinkConn(nil)
    con.Close()
    return
}

func (c *conn) GetRemoteAddr() string{
    return c.remoteString
}

func (c *conn) SetRemoteAddr(str string) {
    c.remoteString = str
}

func (c *conn) SetLocalAddr(str string) {
    c.localString = str
}
func (c *conn) GetLocalAddr()string {
    return c.localString
}

func (c *conn) GetLoopFd()int {
    tloop := c.loop
    if tloop == nil{
        return -1
    }
    return tloop.idx
}

func (c *conn) FreeBalancer(){
    tloop := c.loop
    if tloop == nil || tloop.svr==nil{
        return
    }
    // _, ok := reflect.TypeOf(*(tloop.svr.lb)).FieldByName("num")
    v, ok := tloop.svr.lb.(*ipAverageLoadBalancer)
    if ok{
        v.Lock()
        defer v.Unlock()
        host := c.localString
        v.num[host]--
        if v.num[host] == 0{
           delete(v.num,host)
           delete(v.idx,host)
        }
    }
    return
}

// ==================================== Implementation of Round-Robin load-balancer ====================================
// ipAverageLoadBalancer with Round-Robin algorithm.
type ipAverageLoadBalancer struct {
    sync.Mutex
    nextLoopIndex int
    eventLoops    []*eventloop
    size          int
    idx map[string]int
    num map[string]int
}
func (lb *ipAverageLoadBalancer) register(el *eventloop) {
	el.idx = lb.size
	lb.eventLoops = append(lb.eventLoops, el)
	lb.size++
}

// next returns the eligible event-loop based on Round-Robin algorithm.
func (lb *ipAverageLoadBalancer) next(netAddr net.Addr) (el *eventloop) {
    host,_, _ := net.SplitHostPort(netAddr.String())
    if v,ok := lb.idx[host];ok{
        el = lb.eventLoops[v]
        lb.num[host] = lb.num[host]+1
    }else{
        lb.idx[host] = lb.nextLoopIndex
        lb.num[host] = 1
        el = lb.eventLoops[lb.nextLoopIndex]
        if lb.nextLoopIndex++; lb.nextLoopIndex >= lb.size {
            lb.nextLoopIndex = 0
        }
    }
	return
}

func (lb *ipAverageLoadBalancer) iterate(f func(int, *eventloop) bool) {
	for i, el := range lb.eventLoops {
		if !f(i, el) {
			break
		}
	}
}

func (lb *ipAverageLoadBalancer) len() int {
	return lb.size
}

func (lb *ipAverageLoadBalancer) calibrate(el *eventloop, delta int32) {
	atomic.AddInt32(&el.connCount, delta)
}

func StartEpoll(cpuNum int,eventHandler EventHandler,codec ICodec) (ep *EPoller,err error){
    if cpuNum <= 0{
        cpuNum = 1
    }
    myEpoll,err =  NewEpollServe(eventHandler,
        WithMulticore(false),
        WithCodec(codec),
        WithNumEventLoop(cpuNum),
        WithTCPKeepAlive(time.Second*60),
        // WithNumEventLoop(1),
        // WithTicker(true),
    )
    return myEpoll,err
}

func AcceptNewConnection(remote string) (cout Conn,err error){
    return myEpoll.AcceptNewConnection(remote)
}
